1 /**
2 * Copyright 2014 Netflix, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package rx.internal.operators;
17
18 import static rx.Observable.concat;
19 import static rx.Observable.just;
20 import static rx.Observable.zip;
21 import rx.Observable;
22 import rx.functions.Func1;
23 import rx.functions.Func2;
24 import rx.internal.util.UtilityFunctions;
25
26 /**
27 * Returns an {@link Observable} that emits a single {@code Boolean} value that indicates whether two source
28 * {@code Observable}s emit sequences of items that are equivalent to each other.
29 */
30 public final class OperatorSequenceEqual {
31 private OperatorSequenceEqual() {
32 throw new IllegalStateException("No instances!");
33 }
34
35 /** NotificationLite doesn't work as zip uses it. */
36 private static final Object LOCAL_ONCOMPLETED = new Object();
37 static <T> Observable<Object> materializeLite(Observable<T> source) {
38 return concat(
39 source.map(new Func1<T, Object>() {
40
41 @Override
42 public Object call(T t1) {
43 return t1;
44 }
45
46 }), just(LOCAL_ONCOMPLETED));
47 }
48
49 /**
50 * Tests whether two {@code Observable} sequences are identical, emitting {@code true} if both sequences
51 * complete without differing, and {@code false} if the two sequences diverge at any point.
52 *
53 * @param first
54 * the first of the two {@code Observable}s to compare
55 * @param second
56 * the second of the two {@code Observable}s to compare
57 * @param equality
58 * a function that tests emissions from each {@code Observable} for equality
59 * @return an {@code Observable} that emits {@code true} if {@code first} and {@code second} complete
60 * after emitting equal sequences of items, {@code false} if at any point in their sequences the
61 * two {@code Observable}s emit a non-equal item.
62 */
63 public static <T> Observable<Boolean> sequenceEqual(
64 Observable<? extends T> first, Observable<? extends T> second,
65 final Func2<? super T, ? super T, Boolean> equality) {
66 Observable<Object> firstObservable = materializeLite(first);
67 Observable<Object> secondObservable = materializeLite(second);
68
69 return zip(firstObservable, secondObservable,
70 new Func2<Object, Object, Boolean>() {
71
72 @Override
73 @SuppressWarnings("unchecked")
74 public Boolean call(Object t1, Object t2) {
75 boolean c1 = t1 == LOCAL_ONCOMPLETED;
76 boolean c2 = t2 == LOCAL_ONCOMPLETED;
77 if (c1 && c2) {
78 return true;
79 }
80 if (c1 || c2) {
81 return false;
82 }
83 // Now t1 and t2 must be 'onNext'.
84 return equality.call((T)t1, (T)t2);
85 }
86
87 }).all(UtilityFunctions.<Boolean> identity());
88 }
89 }